将 MQTT 消息桥接到其他 MQTT 服务 您所在的位置:网站首页 ldap golang 连接池 将 MQTT 消息桥接到其他 MQTT 服务

将 MQTT 消息桥接到其他 MQTT 服务

2024-01-10 01:52| 来源: 网络整理| 查看: 265

将 MQTT 消息桥接到其他 MQTT 服务 ​

MQTT 桥接是一种连接多个 EMQX 集群或其他 MQTT 服务的方式。本页介绍了 EMQX 中 MQTT Sink 与 Source 的工作原理,并提供了在 Dashboard 或使用配置文件进行配置的快速入门教程。

工作原理 ​

EMQX 支持在两种主要模式下工作的 MQTT 桥接:入口(Source)和出口(Sink)。本节将详细介绍每种模式的工作原理。同时,还介绍了在这两种模式中使用的连接池。

下图展示了 EMQX 和 其他 MQTT 服务之间的数据集成的典型架构:

入口模式 ​

在 入口方向上, MQTT Soure 可以作为规则的数据源。MQTT Soure 与规则配合工作的消息流程如下:

出口模式 ​

在出口方向下,可以将规则处理结果作为消息,转发到远程 MQTT 服务器的指定主题下:

连接池 ​

EMQX 允许多个客户端同时连接到桥接的 MQTT 服务器。在创建桥接时您可以设置一个 MQTT 客户端连接池,并配置连接池大小以表明连接池中的客户端连接数。在 MQTT Sink 中启用连接池,可以充分利用服务器资源,以实现更大的消息吞吐和更好的并发性能。这对于处理高负载、高并发的场景非常重要。

由于 MQTT 协议要求连接到一个 MQTT 服务器的客户端必须具有唯一的客户端 ID,因此连接池中的每个客户端都被分配了一个唯一的客户端 ID。为了使客户端 ID 可预测,EMQX 根据以下模式自动生成客户端 ID:

bash[${ClientIDPrefix}:]${BridgeName}:${Mode}:${NodeName}:${N}片段描述${ClientIDPrefix}配置的客户端 ID 前缀。如果未设置,则省略整个第一片段。${BridgeName}桥接的名称,由用户提供。${Mode}ingress 或 egress。${NodeName}运行 MQTT 客户端的节点名称。${N}从 1 到配置的 MQTT 客户端连接池的大小的数字。在入口模式下使用连接池 ​

尽管连接池适用于入口和出口模式,但在入口模式下使用连接池需要考虑一些注意事项。当您拥有多个节点的 EMQX 集群并配置了一个入口 MQTT 桥接以从远程 MQTT 服务器订阅非共享主题时,如果连接池中的所有客户端都订阅相同的主题,它们将从远程 MQTT 服务器接收到重复的消息,这将给服务器带来压力。在这种情况下,强烈建议使用共享订阅作为一种安全措施。例如,您可以将远程 MQTT 服务器的主题配置为 $share/name1/topic1 或者在使用主题过滤器时配置为 $share/name2/topic2/#。在非共享订阅情况下,MQTT 客户端连接池将缩减为一个客户端,这意味着只有一个客户端会启动。

特性与优势 ​

MQTT 数据桥具有以下特性和优势:

广泛的兼容性:MQTT Sink 使用标准的 MQTT 协议,可以桥接到各类物联网平台,包括 AWS IoT Core、Azure IoT Hubs 等,同时也支持开源或其他行业 MQTT Broker 和物联网平台。这使得它可以与各种设备和平台进行无缝集成和通信。双向数据流:桥接支持双向数据流,可以将 EMQX 本地的消息发布到远程 MQTT 服务,并且也可以从 MQTT 服务订阅消息并在本地发布。这种双向通信能力使得不同系统之间的数据传输更加灵活和可控。灵活的主题映射:基于 MQTT 发布订阅模式,MQTT Sink 实现了灵活的主题映射。它支持为主题添加前缀,可以利用客户端的上下文信息(如客户端 ID、用户名等)动态构造主题。这种灵活性使得可以根据具体需求对消息进行定制化处理和路由。高性能:MQTT Sink 提供了性能优化功能,如连接池和共享订阅,以降低单个桥接客户端的负载情况,实现更低的桥接延迟和更高的桥接消息吞吐量。通过这些优化措施,可以提升整体系统的性能和可扩展性。消息 payload 转换:MQTT Sink 允许通过定义 SQL 规则对消息 payload 进行处理。这意味着在消息传输过程中,可以对 payload 进行数据提取、过滤、丰富和转换等操作。例如,可以从 payload 中提取实时指标,并在消息传递到 Kafka 之前进行数据转换和处理。指标监控:MQTT Sink 提供了对每个桥接运行指标的监控。可以查看消息总数、成功/失败计数、当前速率等指标,帮助用户实时监控和评估桥接的性能和健康状况。准备工作 ​

确保您已经了解以下内容:

规则 -数据集成创建连接器 ​

本节将以 EMQX 的在线 MQTT 服务器作为桥接服务器,指导您如何配置连接与桥接。

转到 Dashboard 集成 -> 连接器页面。

点击页面右上角的创建。

在连接器类型中选择 MQTT 服务,点击下一步。

输入连接器 名称,要求是大小写英文字母或数字组合,例如 my_mqtt_bridge。

进行连接相关配置。MQTT 服务地址设为 broker.emqx.io:1883,由于该服务器不需要认证,因此用户名、密码留空即可。该区域的其他字段可保留默认设置,也可根据实际场景设置。

通过入口配置或出口配置设定桥接规则。

TIP

入口配置与出口配置应至少配置一个,您可打开下方的开关进行相关配置。

入口配置(可选):配置桥接规则,将远程 MQTT 服务上的消息转发到本地;我们希望订阅 remote/topic/ingress 下的消息,可以进行如下配置:

主题:在集群工作模式下,须通过共享订阅来避免消息重复,因此填入 $share/g/remote/topic/ingress。QoS:选择 0。连接池大小:指定本地 MQTT 服务的客户端连接池的大小。在这个例子中,您可以设置为8。只要远程 MQTT 服务的主题使用共享订阅,这样的设置不会影响性能。

订阅到的消息可以在后续通过规则引擎进行处理,例如将消息转发到 EMQX 节点上的客户端。规则中可以获取以下字段:

字段名称描述topic来源消息主题server桥接连接的服务器地址retain是否保留消息,值为 falseqos消息服务质量pub_propsMQTT 5.0 消息属性对象,包含用户属性对、用户属性和其他属性pub_props.User-Property-Pairs用户属性对数组,每个包含键值对,例如 {"key":"foo", "value":"bar"}pub_props.User-Property用户属性对象,包含键值对,例如 {"foo":"bar"}pub_props.*其他包含的消息属性键值对,例如 Content-Type: JSONpayload消息内容message_received_at消息接收时间戳,单位为毫秒id消息 IDdup是否为重复消息

出口配置(可选):将本地指定 MQTT 主题下的消息发布到远程 MQTT 服务,可以理解为入口配置的反向数据流。如果我们希望将消息转发到远程 MQTT 服务 remote/topic/egress 主题中,可以进行如下配置:

主题:填入 remote/topic/egress。QoS:选择 0,或 ${qos} (跟随消息 QoS)。Retain:通过勾选确认是否以保留消息方式发布消息。消息模版:转发的消息 Payload 模板,支持使用 ${field} 语法提取数据。连接池大小:指定本地 MQTT 服务器的客户端连接池的大小。在这个例子中,您可以设置为8。

其他配置(可选),根据情况配置同步/异步模式,队列与批量等参数。

点击创建按钮完成 MQTT 连接器的创建。

创建 MQTT Sink 规则 ​

接下来将继续创建一条规则来指定需要转发至远程 MQTT 服务的数据。

转到 Dashboard 集成 -> 规则页面。

点击页面右上角的创建。

输入规则 ID my_rule,在 SQL 编辑器中输入规则,此处选择将 t/# 主题的 MQTT 消息存储至 TimescaleDB,规则 SQL 如下:

sqlSELECT * FROM "t/#"

添加动作,从动作类型下拉列表中选择 MQTT 服务,在动作下拉框选择刚刚创建的连接器。点击添加按钮将其添加到动作列表中。

回到规则创建页面,点击最下方创建按钮完成规则创建。

回到规则创建页面,点击创建按钮完成整个规则创建。

现在您已成功创建了规则,你可以点击集成 -> 规则页面看到新建的规则,同时在动作(Sink) 标签页看到新建的 MQTT Sink。

您也可以点击 集成 -> Flow 设计器查看拓扑,通过拓扑可以直观的看到,主题 t/# 下的消息在经过规则 my_rule 解析后被发送到 MQTT 服务中。

创建 MQTT Source 规则 ​

接下来将继续创建一条规则,将远程 MQTT 服务的数据转发至本地。

转到 Dashboard 集成 -> 规则页面。

点击页面右上角的创建。

输入规则 ID my_rule。

在 SQL 编辑器中输入规则,如果您想使用规则处理名为 my_mqtt_bridge 的 MQTT Source 中订阅到的消息,请在 SQL 编辑器 中输入以下语句:

注意:如果您想指定自己的 SQL 语法,请确保 SELECT 部分包含了稍后步骤中设置的消息重发布操作所需的所有字段。

sqlSELECT * FROM `$bridges/mqtt:my_mqtt_bridge`

点击 + 添加操作 按钮来定义规则触发的操作。从下拉列表中选择 消息重发布。

在 主题 和 Payload 字段中,您可以输入您想重新发布的消息的主题和 payload。例如,对于此演示,输入 t/1 和 ${.}。

点击 添加 将此动作包含在规则中。

回到 创建规则 页面,点击 创建。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有